package groupOne.app.DWD.db;


import groupOne.app.BaseAppSql;
import groupOne.util.SQLUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class Dwd_02_DwdInteractionFavorAdd_LiuWei extends BaseAppSql {
    public static void main(String[] args) {
        new Dwd_02_DwdInteractionFavorAdd_LiuWei().init(3002,2,"Dwd_02_DwdInteractionFavorAdd_LiuWei");
    }

    @Override
    protected void handle(StreamExecutionEnvironment env, StreamTableEnvironment tEnv) {
       readOdsDb(tEnv, "Dwd_02_DwdInteractionFavorAdd_LiuWei");

        Table favorInfo =tEnv.sqlQuery("select " +
               "data['id'] id, " +
               "data['user_id'] user_id, " +
               "data['sku_id'] sku_id, " +
               "date_format(data['create_time'],'yyyy-MM-dd') date_id, " +
               "data['create_time'] create_time, " +
               "ts " +
               "from ods_db " +
               "where `database`='gmall2022' " +
               "and `table`='favor_info' " +
               "and `type` = 'insert' ");
        tEnv.createTemporaryView("favor_info",favorInfo);

        tEnv.executeSql("create table dwd_interaction_favor_add ( " +
                "id string, " +
                "user_id string, " +
                "sku_id string, " +
                "date_id string, " +
                "create_time string, " +
                "ts string " +
                ")" + SQLUtil.getKafkaSink("dwd_interaction_favor_add_LiuWei"));

        tEnv.executeSql("" +
                "insert into dwd_interaction_favor_add_LiuWei select * from favor_info");
    }
}



